-
Notifications
You must be signed in to change notification settings - Fork 260
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support to scheduler message #3487
base: master
Are you sure you want to change the base?
Conversation
lillo42
commented
Jan 20, 2025
•
edited
Loading
edited
- Remove unnecessary code
- Fixes build
- Add Unit test
- Finish implementation
- Add Sample
- Implement support to HangFire
- Implement support to Quartz
- Implement support to AWS Scheduler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Code Health Quality Gates: FAILED
Change in average Code Health of affected files: -0.02 (8.17 -> 8.15)
-
Declining Code Health: 4 findings(s) 🚩
-
Affected Hotspots: 2 files(s) 🔥
InstrumentationOptions instrumentationOptions = InstrumentationOptions.All, | ||
IAmAMessageSchedulerFactory? messageSchedulerFactory = null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ Getting worse: Constructor Over-Injection
CommandProcessor increases from 8 to 9 arguments, threshold = 5
InstrumentationOptions instrumentationOptions = InstrumentationOptions.All, | ||
IAmAMessageSchedulerFactory? messageSchedulerFactory = null) | ||
: this(subscriberRegistry, handlerFactory, requestContextFactory, policyRegistry, featureSwitchRegistry, inboxConfiguration, messageSchedulerFactory: messageSchedulerFactory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ Getting worse: Constructor Over-Injection
CommandProcessor increases from 11 to 12 arguments, threshold = 5
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Code Health Quality Gates: FAILED
Change in average Code Health of affected files: -0.02 (8.17 -> 8.15)
-
Declining Code Health: 4 findings(s) 🚩
-
Affected Hotspots: 2 files(s) 🔥
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Code Health Quality Gates: FAILED
Change in average Code Health of affected files: -0.16 (8.69 -> 8.53)
- Declining Code Health: 12 findings(s) 🚩
- Improving Code Health: 2 findings(s) ✅
- Affected Hotspots: 7 files(s) 🔥
@@ -37,6 +37,7 @@ THE SOFTWARE. */ | |||
using Paramore.Brighter.FeatureSwitch; | |||
using Paramore.Brighter.Logging; | |||
using Paramore.Brighter.Observability; | |||
using Paramore.Brighter.Tasks; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Getting worse: Code Duplication
introduced similar code in: SchedulerPost,SchedulerPost,SchedulerPostAsync,SchedulerPostAsync
@@ -37,6 +37,7 @@ THE SOFTWARE. */ | |||
using Paramore.Brighter.FeatureSwitch; | |||
using Paramore.Brighter.Logging; | |||
using Paramore.Brighter.Observability; | |||
using Paramore.Brighter.Tasks; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ Getting worse: Missing Arguments Abstractions
The average number of function arguments increases from 4.12 to 4.33, threshold = 4.00
public async Task<string> SchedulerPostAsync<TRequest>(TRequest request, | ||
TimeSpan delay, | ||
RequestContext? requestContext = null, | ||
Dictionary<string, object>? args = null, | ||
bool continueOnCapturedContext = true, | ||
CancellationToken cancellationToken = default) where TRequest : class, IRequest | ||
{ | ||
if (_messageSchedulerFactory == null) | ||
{ | ||
throw new InvalidOperationException("No message scheduler factory defined."); | ||
} | ||
|
||
s_logger.LogInformation("Scheduling a request: {RequestType} {Id}", request.GetType(), request.Id); | ||
|
||
var span = _tracer?.CreateSpan(CommandProcessorSpanOperation.Scheduler, request, requestContext?.Span, options: _instrumentationOptions); | ||
var context = InitRequestContext(span, requestContext); | ||
|
||
try | ||
{ | ||
Message message = await s_mediator!.CreateMessageFromRequestAsync(request, context, cancellationToken); | ||
|
||
var scheduler = _messageSchedulerFactory.Create(this); | ||
return scheduler switch | ||
{ | ||
IAmAMessageSchedulerAsync async => await async.ScheduleAsync(message, delay, cancellationToken).ConfigureAwait(continueOnCapturedContext), | ||
IAmAMessageSchedulerSync sync => sync.Schedule(message, delay), | ||
_ => throw new InvalidOperationException("Message scheduler must be sync or async") | ||
}; | ||
} | ||
catch (Exception e) | ||
{ | ||
_tracer?.AddExceptionToSpan(span, [e]); | ||
throw; | ||
} | ||
finally | ||
{ | ||
_tracer?.EndSpan(span); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ New issue: Excess Number of Function Arguments
SchedulerPostAsync has 6 arguments, threshold = 4
public async Task<string> SchedulerPostAsync<TRequest>(TRequest request, | ||
DateTimeOffset at, | ||
RequestContext? requestContext = null, | ||
Dictionary<string, object>? args = null, | ||
bool continueOnCapturedContext = true, | ||
CancellationToken cancellationToken = default) | ||
where TRequest : class, IRequest | ||
{ | ||
if (_messageSchedulerFactory == null) | ||
{ | ||
throw new InvalidOperationException("No message scheduler factory defined."); | ||
} | ||
|
||
s_logger.LogInformation("Scheduling a request: {RequestType} {Id}", request.GetType(), request.Id); | ||
|
||
var span = _tracer?.CreateSpan(CommandProcessorSpanOperation.Scheduler, request, requestContext?.Span, options: _instrumentationOptions); | ||
var context = InitRequestContext(span, requestContext); | ||
|
||
try | ||
{ | ||
Message message = await s_mediator!.CreateMessageFromRequestAsync(request, context, cancellationToken); | ||
|
||
var scheduler = _messageSchedulerFactory.Create(this); | ||
return scheduler switch | ||
{ | ||
IAmAMessageSchedulerAsync async => await async.ScheduleAsync(message, at, cancellationToken).ConfigureAwait(continueOnCapturedContext), | ||
IAmAMessageSchedulerSync sync => sync.Schedule(message, at), | ||
_ => throw new InvalidOperationException("Message scheduler must be sync or async") | ||
}; | ||
} | ||
catch (Exception e) | ||
{ | ||
_tracer?.AddExceptionToSpan(span, [e]); | ||
throw; | ||
} | ||
finally | ||
{ | ||
_tracer?.EndSpan(span); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ New issue: Excess Number of Function Arguments
SchedulerPostAsync has 6 arguments, threshold = 4
InstrumentationOptions instrumentationOptions = InstrumentationOptions.All, | ||
IAmAMessageSchedulerFactory? messageSchedulerFactory = null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ Getting worse: Constructor Over-Injection
CommandProcessor increases from 8 to 9 arguments, threshold = 5
@@ -144,13 +149,21 @@ public async Task SendWithDelayAsync(Message message, TimeSpan? delay, Cancellat | |||
|
|||
_pendingConfirmations.TryAdd(await Channel.GetNextPublishSequenceNumberAsync(cancellationToken), message.Id); | |||
|
|||
if (DelaySupported) | |||
if (delay == TimeSpan.Zero || DelaySupported) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Complex Method
SendWithDelayAsync has a cyclomatic complexity of 10, threshold = 9
@@ -144,13 +149,21 @@ public async Task SendWithDelayAsync(Message message, TimeSpan? delay, Cancellat | |||
|
|||
_pendingConfirmations.TryAdd(await Channel.GetNextPublishSequenceNumberAsync(cancellationToken), message.Id); | |||
|
|||
if (DelaySupported) | |||
if (delay == TimeSpan.Zero || DelaySupported) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Bumpy Road Ahead
SendWithDelayAsync has 2 blocks with nested conditional logic. Any nesting of 2 or deeper is considered. Threshold is one single, nested block per function
/// <returns>Task.</returns> | ||
public async Task SendWithDelayAsync(Message message, TimeSpan? delay, CancellationToken cancellationToken = default) | ||
{ | ||
await SendAsync(message, cancellationToken); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ Getting worse: Code Duplication
introduced similar code in: SendWithDelay,SendWithDelayAsync
@@ -42,6 +42,7 @@ public static class BrighterSpanExtensions | |||
CommandProcessorSpanOperation.Send => "send", | |||
CommandProcessorSpanOperation.Clear => "clear", | |||
CommandProcessorSpanOperation.Archive => "archive", | |||
CommandProcessorSpanOperation.Scheduler => "scheduler", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Getting worse: Complex Method
ToSpanName increases in cyclomatic complexity from 9 to 10, threshold = 9
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Code Health Quality Gates: FAILED
Change in average Code Health of affected files: -0.16 (8.69 -> 8.53)
- Declining Code Health: 12 findings(s) 🚩
- Improving Code Health: 2 findings(s) ✅
- Affected Hotspots: 7 files(s) 🔥
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Code Health Quality Gates: FAILED
Change in average Code Health of affected files: -0.15 (8.69 -> 8.54)
- Declining Code Health: 13 findings(s) 🚩
- Improving Code Health: 2 findings(s) ✅
- Affected Hotspots: 7 files(s) 🔥
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Code Health Quality Gates: FAILED
Change in average Code Health of affected files: -0.15 (8.69 -> 8.54)
- Declining Code Health: 13 findings(s) 🚩
- Improving Code Health: 2 findings(s) ✅
- Affected Hotspots: 7 files(s) 🔥
public string Schedule(Message message, TimeSpan delay) | ||
{ | ||
var id = getOrCreateSchedulerId(message); | ||
if (s_timers.TryGetValue(id, out var timer)) | ||
{ | ||
if (onConflict == OnSchedulerConflict.Throw) | ||
{ | ||
throw new InvalidOperationException($"scheduler with '{id}' id already exists"); | ||
} | ||
|
||
timer.Dispose(); | ||
} | ||
|
||
s_timers[id] = timeProvider.CreateTimer(Execute, (processor, id, message, false), delay, TimeSpan.Zero); | ||
return id; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Code Duplication
The module contains 2 functions with similar structure: Schedule,ScheduleAsync
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First, there is a tremendous amount of work here, that has broken the back of this. So thank you so much. I think my thoughts are generally:
- Let's overload existing methods to take a DateTimeOffset scheduledAt parameter (and provide a helper method to create a scheduledAt from a TimeSpan that represents a delay) over new method names
- We have different job types: Request (from CP goes back to send, post or publish, you need to know which) and Message (from a consumer implementation, used to delay posting a message, comes back to the consumer )
- That makes your scheduler a bit more gnarly; you want an interface that the 3rd party lib support implements that offers either.
But once we have that this will solve a massive problem for us with supporting a Send at a point in time, which will be a really powerful addition in V10
```c# | ||
public interface IAmACommandProcessor | ||
{ | ||
string SchedulerSend<TRequest>(TimeSpan delay, TRequest request) where TRequest : class, IRequest; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we would be better off overloading Send with a time, than adding a new name here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, how about SendAt, PublishAt, PostAt etc.
{ | ||
if (Scheduler is IAmAMessageSchedulerAsync async) | ||
{ | ||
await async.ScheduleAsync(message, delay.Value, cancellationToken); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably best to avoid a keyword in a variable name, use asyncScheduler instead?
|
||
if (Scheduler is IAmAMessageSchedulerSync sync) | ||
{ | ||
sync.Schedule(message, delay.Value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably have to rename to strategy used above
return; | ||
} | ||
|
||
if (Scheduler is IAmAMessageSchedulerSync sync) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we assume that an async path, always has an async scheduler, and only use the sync scheduler on a sync path (or if transport does not support sync and async, just resort to whichever matches? This allows us to make a blocking decision via the Brighter Sync Context
delay ??= TimeSpan.Zero; | ||
if (delay > TimeSpan.FromMinutes(15)) | ||
{ | ||
if (Scheduler is IAmAMessageSchedulerAsync async) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comments here on name (async)
} | ||
} | ||
|
||
public async Task<string> SchedulerPostAsync<TRequest>(TRequest request, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same comments apply. Overload with an At parameter over a new method convention; only one scheduler type
|
||
namespace Paramore.Brighter; | ||
|
||
public class InMemoryMessageScheduler( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome, thanks for making sure we have one of these
} | ||
|
||
/// <inheritdoc cref="Dispose"/> | ||
public void Dispose() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to ensure we dispose any remaining timers here?
var message = JsonSerializer.Deserialize<Message>(obj!, JsonSerialisationOptions.Options)!; | ||
if (async) | ||
{ | ||
await processor.PostAsync(new FireSchedulerMessage { Id = id, Message = message }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This works if our intent is always to send a message, but on a Send or Publish intent is actually just to delay Send or Publish.
delay ??= TimeSpan.Zero; | ||
if (delay != TimeSpan.Zero) | ||
{ | ||
if (Scheduler is IAmAMessageSchedulerAsync async) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
usual comments on name and pick one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Code Health Quality Gates: FAILED
Change in average Code Health of affected files: +0.01 (8.74 -> 8.75)
- Declining Code Health: 14 findings(s) 🚩
- Improving Code Health: 2 findings(s) ✅
- Affected Hotspots: 7 files(s) 🔥
@@ -0,0 +1,402 @@ | |||
using System.Collections.Concurrent; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Primitive Obsession
In this module, 31.7% of all function arguments are primitive types, threshold = 30.0%
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Code Health Quality Gates: FAILED
Change in average Code Health of affected files: +0.01 (8.74 -> 8.75)
- Declining Code Health: 14 findings(s) 🚩
- Improving Code Health: 2 findings(s) ✅
- Affected Hotspots: 7 files(s) 🔥
public string Schedule(Message message, DateTimeOffset at) | ||
{ | ||
var id = getOrCreateSchedulerId(message); | ||
var job = JobBuilder.Create<QuartzBrighterJob>() | ||
.WithIdentity(getOrCreateSchedulerId(message), group!) | ||
.UsingJobData("message", JsonSerializer.Serialize( | ||
new FireSchedulerMessage { Id = id, Async = false, Message = message }, | ||
JsonSerialisationOptions.Options)) | ||
.Build(); | ||
|
||
var trigger = TriggerBuilder.Create() | ||
.WithIdentity(getOrCreateSchedulerId(message) + "-trigger", group!) | ||
.StartAt(at) | ||
.Build(); | ||
|
||
var tmp = BrighterAsyncContext.Run(async () => await scheduler.ScheduleJob(job, trigger)); | ||
return id; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ New issue: Code Duplication
The module contains 2 functions with similar structure: Schedule,ScheduleAsync
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❌ Code Health Quality Gates: FAILED
Change in average Code Health of affected files: +0.01 (8.74 -> 8.75)
- Declining Code Health: 14 findings(s) 🚩
- Improving Code Health: 2 findings(s) ✅
- Affected Hotspots: 7 files(s) 🔥